python多进程

  • 进程:操作系统进行资源分配基本单位,是程序的基本执行实体,是线程的容器。进程拥有自己独立的数据,CPU时间和内存资源。
  • 多进程:多个进程协作同时处理一个任务。

多进程的实现

os.fork()

类unix可用

1
2
3
4
5
6
7
import os
print('Process %s start...'%os.getpid())
pid = os.fork()
if pid == 0:
print('I am child process %s and my parent is %s.'%(os.getpid(),os.getppid()))
else:
print('I %s just create a child process %s.'%(os.getpid(),pid))

multiprocessing

跨平台的多进程实现模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Process
import os

def runproc(name):
print('运行子进程%s(%s)'%(name, os.getpid()))

if __name__ == '__main__':
print('父进程%s.'% os.getpid())
p = Process(target=runproc, args=('test',))
p.start()
print('子进程启动')
p.join() # 等待子进程结束过后再执行后面的语句
print('进程结束')

=================== RESTART: 多进程.py ===================
父进程7228.
子进程启动
进程结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from multiprocessing import Pool
# 对于同时要启动很多进程的情况
import os, time, random

def long_time_task(name):
print('运行任务:%s(%s)'%(name,os.getpid()))
start = time.time()
time.sleep(random.random()*3)
end = time.time()
print('任务 %s 运行了 %0.2f 秒'%(name, (end-start)))

if __name__ == '__main__':
print('父进程%s启动'%os.getpid())
p = Pool() # 可以传参设置进程池的大小,默认和CPU核数有关
for i in range(5):
p.apply_async(long_time_task, args=(i,)) #异步启动
print('等待所有子进程结束')
# 关闭进程池,不可再添加新进程
p.close()
# 等待所有子进程执行完毕
p.join()
print('全部的子进程都已结束')

=================== ===================
父进程13412启动
等待所有子进程结束
运行任务:0(7496)
运行任务:1(19860)
运行任务:2(24540)
运行任务:3(1172)
任务 3 运行了 0.04
运行任务:4(1172)
任务 1 运行了 0.80
任务 0 运行了 2.41
任务 2 运行了 2.66
任务 4 运行了 2.82
全部的子进程都已结束

os.system

返回命令执行状态码,而将命令执行结果输出到屏幕

推荐使用subprocess来替代

1
2
3
4
5
In [26]: a = os.system('date /T')
2019/03/12 周二

In [27]: a
Out[27]: 0

os.popen

可以获取命令执行结果,但是无法获取命令执行状态码

推荐使用subprocess来替代

1
2
3
4
In [30]: a = os.popen('date /T').read()

In [31]: a
Out[31]: '2019/03/12 周二 \n'

subprocess

参考:https://www.cnblogs.com/yyds/p/7288916.html

函数 描述
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False, universal_newlines=False) Python 3.5中新增的函数。执行指定的命令,等待命令执行完成后返回一个包含执行结果的CompletedProcess类的实例。
subprocess.call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None) 执行指定的命令,返回命令执行状态,其功能类似于os.system(cmd)。
subprocess.check_call(args, *, stdin=None, stdout=None, stderr=None, shell=False, timeout=None) Python 2.5中新增的函数。 执行指定的命令,如果执行成功则返回状态码,否则抛出异常。其功能等价于subprocess.run(…, check=True)。
subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None) Python 2.7中新增的的函数。执行指定的命令,如果执行状态码为0则返回命令执行结果,否则抛出异常。
subprocess.getoutput(cmd) 接收字符串格式的命令,执行命令并返回执行结果,其功能类似于os.popen(cmd).read()和commands.getoutput(cmd)。
subprocess.getstatusoutput(cmd) 执行cmd命令,返回一个元组(命令执行状态, 命令执行结果输出),其功能类似于commands.getstatusoutput()。
  1. 在Python 3.5之后的版本中,官方文档中提倡通过subprocess.run()函数替代其他函数来使用subproccess模块的功能;
  2. 在Python 3.5之前的版本中,我们可以通过subprocess.call(),subprocess.getoutput()等上面列出的其他函数来使用subprocess模块的功能;
  3. subprocess.run()、subprocess.call()、subprocess.check_call()和subprocess.check_output()都是通过对subprocess.Popen的封装来实现的高级函数,因此如果我们需要更复杂功能时,可以通过subprocess.Popen来完成。
  4. subprocess.getoutput()和subprocess.getstatusoutput()函数是来自Python 2.x的commands模块的两个遗留函数。它们隐式的调用系统shell,并且不保证其他函数所具有的安全性和异常处理的一致性。另外,它们从Python 3.3.4开始才支持Windows平台。

参数说明:

  • args: 要执行的shell命令,默认应该是一个字符串序列,如[‘df’, ‘-Th’]或(‘df’, ‘-Th’),也可以是一个字符串,如’df -Th’,但是此时需要把shell参数的值置为True。
  • shell: 如果shell为True,那么指定的命令将通过shell执行。如果我们需要访问某些shell的特性,如管道、文件名通配符、环境变量扩展功能,这将是非常有用的。当然,python本身也提供了许多类似shell的特性的实现,如glob、fnmatch、os.walk()、os.path.expandvars()、os.expanduser()和shutil等。
  • check: 如果check参数的值是True,且执行命令的进程以非0状态码退出,则会抛出一个CalledProcessError的异常,且该异常对象会包含 参数、退出状态码、以及stdout和stderr(如果它们有被捕获的话)。
  • stdout, stderr:
    • run()函数默认不会捕获命令执行结果的正常输出和错误输出,如果我们向获取这些内容需要传递subprocess.PIPE,然后可以通过返回的CompletedProcess类实例的stdout和stderr属性或捕获相应的内容;
    • call()和check_call()函数返回的是命令执行的状态码,而不是CompletedProcess类实例,所以对于它们而言,stdout和stderr不适合赋值为subprocess.PIPE;
    • check_output()函数默认就会返回命令执行结果,所以不用设置stdout的值,如果我们希望在结果中捕获错误信息,可以执行stderr=subprocess.STDOUT。
  • input: 该参数是传递给Popen.communicate(),通常该参数的值必须是一个字节序列,如果universal_newlines=True,则其值应该是一个字符串。
  • universal_newlines: 该参数影响的是输入与输出的数据格式,比如它的值默认为False,此时stdout和stderr的输出是字节序列;当该参数的值设置为True时,stdout和stderr的输出是字符串。
1
2
3
4
5
6
7
8
9
10
11
>>> subprocess.run(["ls", "-l"])  # doesn't capture output
CompletedProcess(args=['ls', '-l'], returncode=0)

>>> subprocess.run("exit 1", shell=True, check=True)
Traceback (most recent call last):
...
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1

>>> subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE)
CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0,
stdout=b'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null\n')

subprocess.run()函数是Python3.5中新增的一个高级函数,其返回值是一个subprocess.CompletedPorcess类的实例,因此,subprocess.completedPorcess类也是Python 3.5中才存在的。它表示的是一个已结束进程的状态信息,它所包含的属性如下:

  • args: 用于加载该进程的参数,这可能是一个列表或一个字符串
  • returncode: 子进程的退出状态码。通常情况下,退出状态码为0则表示进程成功运行了;一个负值-N表示这个子进程被信号N终止了
  • stdout: 从子进程捕获的stdout。这通常是一个字节序列,如果run()函数被调用时指定universal_newlines=True,则该属性值是一个字符串。如果run()函数被调用时指定stderr=subprocess.STDOUT,那么stdout和stderr将会被整合到这一个属性中,且stderr将会为None
  • stderr: 从子进程捕获的stderr。它的值与stdout一样,是一个字节序列或一个字符串。如果stderr灭有被捕获的话,它的值就为None
  • check_returncode(): 如果returncode是一个非0值,则该方法会抛出一个CalledProcessError异常。

多进程间的通讯

  • queue 队列是进程安全的
  • pipe 管道是进程非安全的
  • lock 锁机制保障进程有序

1. Queue

相当于创建了进程间的特殊共享内存区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process, Queue

def f(queue):
queue.put([42,None,'hello'])

if __name__ == '__main__':
queue = Queue()
subprocess = Process(target=f, args=(queue,))
subprocess.start()
print(queue.get())
subprocess.join()

======================================
[42, None, 'hello']
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)

if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

2. Pipe

一管两头,一头发送数据,另一头接收

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process,Pipe

def f(conn):
conn.send([42,None,'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
subprocess = Process(target=f,args=(child_conn,))
subprocess.start()
print(parent_conn.recv())
subprocess.join()

3. Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process,Lock

def f(l,i):
l.acquire()
print('已经解锁,正在运行','传入参数%s'%i)
l.release()
print('恢复锁,释放资源')

if __name__ == '__main__':
lock = Lock()

for num in range(10):
Process(target=f, args=(lock,num)).start()

4. 共享内存

实现进程间的内存共享

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])

5. Manager

内存共享的另一种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)